package com.xiaohu.transfrom;

import com.xiaohu.bean.WaterSensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class FlatMapDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);


        DataStreamSource<WaterSensor> source = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s3", 3L, 3),
                new WaterSensor("s1", 4L, 4),
                new WaterSensor("s5", 5L, 5),
                new WaterSensor("s6", 6L, 6)
        );


        // flatMap 一进多出，比较灵活
        /*
                map怎么控制一进一出：因为使用的一个return

                flatMap怎么控制一进多出：通过调用几次Collector，就输出几条
         */
        SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<WaterSensor, String>() {
            @Override
            public void flatMap(WaterSensor value, Collector<String> out) throws Exception {
                if ("s1".equals(value.getId())) {
                    out.collect(value.getVc().toString()); // 一进一出
                } else if ("s2".equals(value.getId())) {
                    out.collect(value.getTs().toString()); // 一进多出
                    out.collect(value.getVc().toString());
                }
            }
        });

        flatMap.print();


        env.execute();
    }
}
